Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attempting to fix racy Akka.Persistence.TestKit.Tests #7068

Open
wants to merge 34 commits into
base: dev
Choose a base branch
from

Conversation

Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Jan 19, 2024

Changes

Trying to fix a pair of specs with high flip rates in the Akka.NET test suite.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

@Aaronontheweb
Copy link
Member Author

[INFO][01/19/2024 15:34:01.299Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/19/2024 15:34:01.299Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/19/2024 15:34:01.438Z][Thread 0012][akka://test/user/counter] Starting up
[INFO][01/19/2024 15:34:06.494Z][Thread 0012][akka://test/user/counter] Received recover Akka.Persistence.RecoveryCompleted
[INFO][01/19/2024 15:34:06.496Z][Thread 0012][akka://test/user/counter] Received command inc
[DEBUG][01/19/2024 15:34:06.500Z][Thread 0037][ActorSystem(test)] System shutdown initiated

Took ~5 seconds to complete recovery with the Akka.Persistence.TestKit journal? Something's off then.

try
{
var highSequenceNr = await _breaker.WithCircuitBreaker((message, readHighestSequenceNrFrom, awj: this), state =>
state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom));
state.awj.ReadHighestSequenceNrAsync(state.message.PersistenceId, state.readHighestSequenceNrFrom))
.ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if there's some issues with the Akka.Persistence Journal capturing context here, since these tasks run as detatched. I doubt it's the issue but it's one of the things that stood out to me here.

Copy link
Contributor

@Arkatufus Arkatufus Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task isn't detached though, its actually awaited inside the circuit breaker, except when the circuit breaker is open, in which it just throws an open circuit exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true - we have some other racy CircuitBreaker specs too (independently from any Akka.Persistence plugin) - wonder if that's the common issue?

Copy link
Contributor

@Arkatufus Arkatufus Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not the CircuitBreaker, the giant lag on recovery is between PreStart and when the LoadSnapshot message was received by the snapshot store actor:

[INFO][02/01/2024 18:30:23.919Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 18:30:23.940Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 18:30:23.999Z][Thread 0005][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 18:30:24.041Z][Thread 0012][akka://test/user/$b] Starting up and beginning recovery
[INFO][02/01/2024 18:30:29.807Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 18:30:29.809Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 18:30:29.810Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 18:30:29.812Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 18:30:29.812Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 18:30:29.814Z][Thread 0012][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[DEBUG][02/01/2024 18:30:29.838Z][Thread 0012][akka://test/system/akka.persistence.journal.test/$a] Replay completed: RecoverySuccess<highestSequenceNr: 0>
[INFO][02/01/2024 18:30:29.839Z][Thread 0012][akka://test/user/$b] Received recover Akka.Persistence.RecoveryCompleted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think that the bug is actually in the RecoveryPermitter class

@Aaronontheweb
Copy link
Member Author

Well this is puzzling, but reflects the problem using logging statements:

[INFO][01/23/2024 22:19:51.557Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/23/2024 22:19:51.557Z][Thread 0017][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/23/2024 22:19:51.592Z][Thread 0011][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[ERROR][01/23/2024 22:19:55.451Z][Thread 0013][Bug4762FixSpec (akka://test)] Error during execution of WithJournalWrite
Cause: Xunit.Sdk.FailException: Failed: Timeout 00:00:03 while waiting for a message of type Akka.Persistence.RecoveryCompleted 
   at Xunit.Assert.Fail(String message) in /_/src/xunit.assert/Asserts/FailAsserts.cs:line 34
   at Akka.TestKit.Xunit2.XunitAssertions.Fail(String format, Object[] args) in /home/vsts/work/1/s/src/contrib/testkits/Akka.TestKit.Xunit2/XunitAssertions.cs:line 26
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`2 assert, String hint, CancellationToken cancellationToken, Boolean shouldLog) in /home/vsts/work/1/s/src/core/Akka.TestKit/TestKitBase_Expect.cs:line 436
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in /home/vsts/work/1/s/src/core/Akka.TestKit/TestKitBase_Expect.cs:line 411
   at Akka.TestKit.TestKitBase.InternalExpectMsgAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in /home/vsts/work/1/s/src/core/Akka.TestKit/TestKitBase_Expect.cs:line 379
   at Akka.Persistence.TestKit.Tests.Bug4762FixSpec.<>c__DisplayClass5_0.<<TestJournal_PersistAll_should_only_count_each_event_exceptions_once>b__1>d.MoveNext() in /home/vsts/work/1/s/src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs:line 93
--- End of stack trace from previous location ---
   at Akka.Persistence.TestKit.PersistenceTestKit.WithJournalWrite(Func`2 behaviorSelector, Func`1 execution) in /home/vsts/work/1/s/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs:line 143
[INFO][01/23/2024 22:19:55.452Z][Thread 0013][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[DEBUG][01/23/2024 22:19:56.467Z][Thread 0035][ActorSystem(test)] System shutdown initiated
[INFO][01/23/2024 22:19:56.476Z][Thread 0035][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][01/23/2024 22:19:56.476Z][Thread 0035][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop

@Aaronontheweb
Copy link
Member Author

When that same test passes, the logs look like this:

[INFO][01/24/2024 00:02:36.465Z][Thread 0010][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/24/2024 00:02:36.484Z][Thread 0010][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/24/2024 00:02:36.536Z][Thread 0031][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][01/24/2024 00:02:36.593Z][Thread 0032][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][01/24/2024 00:02:36.594Z][Thread 0032][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][01/24/2024 00:02:36.638Z][Thread 0034][akka://test/user/$b] Received recover Akka.Persistence.RecoveryCompleted
[INFO][01/24/2024 00:02:36.665Z][Thread 0034][akka://test/user/$b] Received command Akka.Persistence.TestKit.Tests.Bug4762FixSpec+WriteMessage
[INFO][01/24/2024 00:02:36.704Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Beginning write intercept of message Persistent<pid: foo, seqNr: 1, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:36.704Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Completed write intercept of message Persistent<pid: foo, seqNr: 1, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:36.705Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Beginning write intercept of message Persistent<pid: foo, seqNr: 2, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:36.705Z][Thread 0033][akka://test/system/akka.persistence.journal.test] Completed write intercept of message Persistent<pid: foo, seqNr: 2, deleted: False, manifest: , sender: , payload: Akka.Persistence.TestKit.Tests.Bug4762FixSpec+TestEvent, writerGuid: b53d40b9-788c-491d-b676-9ae7dd3fd287> with interceptor Noop
[INFO][01/24/2024 00:02:39.722Z][Thread 0030][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[DEBUG][01/24/2024 00:02:39.781Z][Thread 0034][ActorSystem(test)] System shutdown initiated
[DEBUG][01/24/2024 00:02:39.831Z][Thread 0014][EventStream] Shutting down: StandardOutLogger started
[DEBUG][01/24/2024 00:02:39.831Z][Thread 0014][EventStream] All default loggers stopped

@Aaronontheweb
Copy link
Member Author

Akka.Persistence.TestKit.Tests.Bug4762FixSpec.TestJournal_PersistAll_should_only_count_each_event_exceptions_once is still failing with

[INFO][01/24/2024 02:01:24.467Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][01/24/2024 02:01:24.467Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][01/24/2024 02:01:24.565Z][Thread 0025][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[ERROR][01/24/2024 02:01:27.720Z][Thread 0036][Bug4762FixSpec (akka://test)] Error during execution of WithJournalWrite
Cause: Xunit.Sdk.FailException: Failed: Timeout 00:00:03 while waiting for a message of type Akka.Persistence.RecoveryCompleted 
   at Xunit.Assert.Fail(String message) in /_/src/xunit.assert/Asserts/FailAsserts.cs:line 34
   at Akka.TestKit.Xunit2.XunitAssertions.Fail(String format, Object[] args) in D:\a\1\s\src\contrib\testkits\Akka.TestKit.Xunit2\XunitAssertions.cs:line 26
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`2 assert, String hint, CancellationToken cancellationToken, Boolean shouldLog) in D:\a\1\s\src\core\Akka.TestKit\TestKitBase_Expect.cs:line 436
   at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelopeAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in D:\a\1\s\src\core\Akka.TestKit\TestKitBase_Expect.cs:line 411
   at Akka.TestKit.TestKitBase.InternalExpectMsgAsync[T](Nullable`1 timeout, Action`1 msgAssert, Action`1 senderAssert, String hint, CancellationToken cancellationToken) in D:\a\1\s\src\core\Akka.TestKit\TestKitBase_Expect.cs:line 379
   at Akka.Persistence.TestKit.Tests.Bug4762FixSpec.<>c__DisplayClass5_0.<<TestJournal_PersistAll_should_only_count_each_event_exceptions_once>b__1>d.MoveNext() in D:\a\1\s\src\core\Akka.Persistence.TestKit.Tests\Bug4762FixSpec.cs:line 93
--- End of stack trace from previous location ---
   at Akka.Persistence.TestKit.PersistenceTestKit.WithJournalWrite(Func`2 behaviorSelector, Func`1 execution) in D:\a\1\s\src\core\Akka.Persistence.TestKit.Xunit2\PersistenceTestKit.cs:line 143
[INFO][01/24/2024 02:01:27.720Z][Thread 0036][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[DEBUG][01/24/2024 02:01:29.754Z][Thread 0037][ActorSystem(test)] System shutdown initiated
[INFO][01/24/2024 02:01:29.770Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][01/24/2024 02:01:29.770Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop

@Aaronontheweb
Copy link
Member Author

So what is blocking the TestSnapshotStore from running in this case? Why does it only begin recovery as the ActorSystem is shutting down in these racy failure cases?

@Arkatufus
Copy link
Contributor

So what is blocking the TestSnapshotStore from running in this case? Why does it only begin recovery as the ActorSystem is shutting down in these racy failure cases?

That is a red herring, something is blocking the persistent actor under test thread right before recovery even started.

These are logs taken from 2 different tests, one is from Bug4762FixSpec and the other is from the CounterActorTests. I deliberately increased the Bug4762FixSpec timeout value to 10 seconds to see if the failure was caused by a lag or a total failure:

Bug4762FixSpec

[INFO][02/01/2024 23:26:52.474Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 23:26:52.474Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 23:26:52.585Z][Thread 0025][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 23:26:52.598Z][Thread 0008][akka://test/user/$b] Starting up and beginning recovery
[INFO][02/01/2024 23:26:52.599Z][Thread 0008][akka://test/user/$b] Stashing [Akka.Persistence.TestKit.Tests.Bug4762FixSpec+WriteMessage]
[INFO][02/01/2024 23:26:52.603Z][Thread 0008][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/$b#1976573591]
[INFO][02/01/2024 23:26:52.603Z][Thread 0008][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/$b#1976573591]
[INFO][02/01/2024 23:26:52.603Z][Thread 0008][akka://test/user/$b] Recovery granted
[INFO][02/01/2024 23:26:54.112Z][Thread 0008][akka://test/user/$b] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 23:26:54.112Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 23:26:54.113Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 23:26:54.116Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 23:26:54.117Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 23:26:54.118Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 23:26:54.120Z][Thread 0008][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[DEBUG][02/01/2024 23:26:54.159Z][Thread 0008][akka://test/system/akka.persistence.journal.test/$a] Replay completed: RecoverySuccess<highestSequenceNr: 0>
[INFO][02/01/2024 22:33:44.671Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 22:33:44.673Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 22:33:44.679Z][Thread 0012][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 22:33:44.736Z][Thread 0012][akka://test/user/$b] Starting up and beginning recovery
[INFO][02/01/2024 22:33:44.736Z][Thread 0012][akka://test/user/$b] Stashing [Akka.Persistence.TestKit.Tests.Bug4762FixSpec+WriteMessage]
[INFO][02/01/2024 22:33:44.758Z][Thread 0005][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/$b#1999959018]
[INFO][02/01/2024 22:33:44.758Z][Thread 0005][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/$b#1999959018]
[INFO][02/01/2024 22:33:44.758Z][Thread 0005][akka://test/user/$b] Recovery granted
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/user/$b] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot foo loading using interceptor Noop
[INFO][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[DEBUG][02/01/2024 22:33:48.653Z][Thread 0005][akka://test/system/akka.persistence.journal.test/$a] Replay completed: RecoverySuccess<highestSequenceNr: 0>

CounterActorTests

[INFO][02/01/2024 23:26:54.209Z][Thread 0015][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 23:26:54.209Z][Thread 0015][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 23:26:54.217Z][Thread 0013][akka://test/system/akka.persistence.journal.test] Using write interceptor Failure
[INFO][02/01/2024 23:26:54.220Z][Thread 0013][ActorSystem(test)] Messaging actor
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/user/counter] Starting up
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/user/counter] Stashing [inc]
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/counter#459201627]
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/counter#459201627]
[INFO][02/01/2024 23:26:54.226Z][Thread 0013][akka://test/user/counter] Recovery granted
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/user/counter] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Starting LoadSnapshotAsync circuit breaker.
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Invoking LoadAsync inside circuit breaker.
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Starting to intercept snapshot test loading using interceptor Noop
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] Completed intercept snapshot test loading using interceptor Noop
[INFO][02/01/2024 23:26:55.130Z][Thread 0013][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshotAsync circuit breaker completed.
[INFO][02/01/2024 23:26:55.131Z][Thread 0013][akka://test/user/counter] Received recover Akka.Persistence.RecoveryCompleted
[INFO][02/01/2024 21:48:16.654Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting journal plugin `akka.persistence.journal.test`
[INFO][02/01/2024 21:48:16.656Z][Thread 0016][PersistenceExtension (akka://test)] Auto-starting snapshot store `akka.persistence.snapshot-store.test`
[INFO][02/01/2024 21:48:16.661Z][Thread 0032][akka://test/system/akka.persistence.journal.test] Using write interceptor Failure
[INFO][02/01/2024 21:48:16.689Z][Thread 0032][ActorSystem(test)] Messaging actor
[INFO][02/01/2024 21:48:16.698Z][Thread 0007][akka://test/user/counter] Starting up
[INFO][02/01/2024 21:48:16.698Z][Thread 0007][akka://test/user/counter] Stashing [inc]
[INFO][02/01/2024 21:48:16.700Z][Thread 0032][akka://test/system/recoveryPermitter] RequestRecoveryPermit received: [akka://test/user/counter#79486081]
[INFO][02/01/2024 21:48:16.700Z][Thread 0032][akka://test/system/recoveryPermitter] Recovery granted: [akka://test/user/counter#79486081]
[INFO][02/01/2024 21:48:16.700Z][Thread 0032][akka://test/user/counter] Recovery granted
[ERROR][02/01/2024 21:48:21.633Z][Thread 0035][CounterActorTests (akka://test)] Error during execution of WithJournalWrite
Cause: Xunit.Sdk.FailException: Failed: Timeout 00:00:03 while waiting for a message of type Akka.Actor.Terminated Terminated [akka://test/user/counter#79486081]. 
   at Xunit.Assert.Fail(String message) in /_/src/xunit.assert/Asserts/FailAsserts.cs:line 34
<SNIP>
   at Xunit.Sdk.ExceptionAggregator.RunAsync(Func`1 code) in /_/src/xunit.core/Sdk/ExceptionAggregator.cs:line 90
[INFO][02/01/2024 21:48:21.634Z][Thread 0035][akka://test/system/akka.persistence.journal.test] Using write interceptor Noop
[INFO][02/01/2024 21:48:21.654Z][Thread 0032][akka://test/user/counter] Telling SnapshotStore to load snapshot
[INFO][02/01/2024 21:48:21.654Z][Thread 0032][akka://test/system/akka.persistence.snapshot-store.test] LoadSnapshot message received.

Observations

  • Note the logs timestamp before the "Recovery granted" log message from the test actor
  • Note the logs timestamp after the "Telling SnapshotStore to load snapshot" log message
  • They are all measured in miliseconds and sub miliseconds, which is what we expect on a properly behaving system
  • Note the time it took between the "Recovery granted" and "Telling SnapshotStore to load snapshot" log message
  • Notice that there is a noticeable 1-4 seconds lag between these two lines. On a very bad day, this lag can extend to up to 7 seconds
  • These logs are taken from a slightly modified "Eventsourced.Lifecycle.cs" file:
    image
  • These anomalies only happened on the build server, both me and Aaron could not see them when we're running the tests locally on our machines

@Arkatufus
Copy link
Contributor

Note that there are no async-await between the two log lines, only a simple code.

The ChangeState(RecoveryStarted(recovery.ReplayMax)); code and be simplified even further by rewriting it to _currentState = RecoveryStarted(recovery.ReplayMax);

The RecoveryStarted() method returns a simple POCO class EventsourcedState that held a state, function delegate, and function expression to do message handling. The only computationally heavy code inside the RecoveryStarted() method is the code that retrieves the timeout value from within the HOCON configuration.

@Arkatufus
Copy link
Contributor

Changing the actor dispatcher to "akka.actor.internal-dispatcher" eliminates the start-up delay, could it be that there's something wrong inside "akka.actor.default-dispatcher" implementation?

@Aaronontheweb
Copy link
Member Author

Changing the actor dispatcher to "akka.actor.internal-dispatcher" eliminates the start-up delay, could it be that there's something wrong inside "akka.actor.default-dispatcher" implementation?

We implemented this in #7117 - let's see how it does

@Aaronontheweb
Copy link
Member Author

@Arkatufus looks to me like the dispatcher changes didn't have any impact

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants